<?php
namespace app\common\mq;

require_once 'cmq/cmq_api.php';
require_once CMQAPI_ROOT_PATH . '/account.php';
require_once CMQAPI_ROOT_PATH . '/queue.php';
require_once CMQAPI_ROOT_PATH . '/cmq_exception.php';
require_once CMQAPI_ROOT_PATH . '/topic.php';
require_once CMQAPI_ROOT_PATH . '/subscription.php';
use CMQExceptionBase;
use TencentCloud\Common\Credential;
use TencentCloud\Common\Exception\TencentCloudSDKException;
use TencentCloud\Common\Profile\ClientProfile;
use TencentCloud\Common\Profile\HttpProfile;
use TencentCloud\Tdmq\V20200217\TdmqClient;

class CMQ
{
    private $client;
    private $account;

    public function __construct()
    {
        $secretId = env('cmq.secret_id', '');
        $secretKey = env('cmq.secret_key', '');
        $queue_endpoint = env('cmq.queue_endpoint', '');
        $region = env('cmq.region', '');
        $credential = new Credential($secretId, $secretKey);
        $httpProfile = new HttpProfile();
        $httpProfile->setEndpoint($queue_endpoint);
        $clientProfile = new ClientProfile();
        $clientProfile->setHttpProfile($httpProfile);
        $this->client = new TdmqClient($credential, $region, $clientProfile);
        $endpoint = env('cmq.endpoint', '');
        $this->account = new \Account($endpoint, $secretId, $secretKey);
    }

    /**
     * 发送消息
     * @author 贺强
     * @time   2022/6/7 13:46
     * @param string $queue_name 队列名称
     * @param string $msg_body   消息体/消息内容
     */
    public function sendMessage($queue_name, $msg_body)
    {
        try {
            $queue = $this->account->get_queue($queue_name);
            $message = new \Message($msg_body);
            return $queue->send_message($message);
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }

    /**
     * 批量发送消息
     * @author 贺强
     * @time   2022/6/7 13:56
     * @param string $queue_name 队列名称
     * @param array  $msg_list   要发送的消息集合
     */
    public function batchSendMessage($queue_name, $msg_list)
    {
        if (empty($queue_name) || empty($msg_list) || !is_array($msg_list)) {
            return false;
        }
        try {
            $messages = [];
            foreach ($msg_list as $msg) {
                $messages[] = new \Message($msg);
            }
            $queue = $this->account->get_queue($queue_name);
            $res = $queue->batch_send_message($messages);
            return $res;
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }

    /**
     * 消费消息
     * @author 贺强
     * @time   2022/6/7 14:03
     * @param string $queue_name 队列名称
     */
    public function receiveMessage($queue_name)
    {
        try {
            $queue = $this->account->get_queue($queue_name);
            return $queue->receive_message(3);
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }

    /**
     * 批量消费消息
     * @author 贺强
     * @time   2022/6/7 14:06
     * @param string $queue_name 队列名称
     * @param int    $num_of_msg 消息数量
     */
    public function batchReceiveMessage($queue_name, $num_of_msg)
    {
        if (empty($queue_name) || empty($num_of_msg)) {
            return false;
        }
        try {
            $queue = $this->account->get_queue($queue_name);
            $num_of_msg = $num_of_msg < 16 ? $num_of_msg : 16;
            $res = $queue->batch_receive_message($num_of_msg, 3);
            return $res;
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }

    /**
     * 删除消息
     * @author 贺强
     * @time   2022/6/7 14:14
     * @param string $queue_name     队列名称
     * @param string $receipt_handle 消息句柄
     */
    public function deleteMessage($queue_name, $receipt_handle)
    {
        try {
            $queue = $this->account->get_queue($queue_name);
            $queue->delete_message($receipt_handle);
            return 1;
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }

    /**
     * 批量删除消息
     * @author 贺强
     * @time   2022/6/7 14:20
     * @param string $queue_name          队列名称
     * @param null   $receipt_handle_list 句柄集合
     */
    public function batchDeleteMessage($queue_name, $receipt_handle_list = null)
    {
        try {
            $queue = $this->account->get_queue($queue_name);
            $queue->batch_delete_message($receipt_handle_list);
            return 1;
        } catch (TencentCloudSDKException $e) {
            return $e->getMessage();
        }
    }

    /**
     * 发布消息
     * @author 贺强
     * @time   2022/6/7 15:17
     * @param array $param 发布消息参数
     */
    public function publishMessage($param)
    {
        if (empty($param['topic_name']) || empty($param['message'])) {
            return false;
        }
        try {
            $topic_name = $param['topic_name'];
            $message = $param['message'];
            $tags = $param['tags'] ?? null;
            $keys = $param['keys'] ?? null;
            $topic = $this->account->get_topic($topic_name);
            return $topic->publish_message($message, $tags, $keys);
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }

    /**
     * 批量发布消息
     * @author 贺强
     * @time   2022/6/7 15:21
     * @param array $param 发布消息参数
     */
    public function batchPublishMessage($param)
    {
        if (empty($param['topic_name']) || empty($param['message_list']) || !is_array($param['message_list'])) {
            return false;
        }
        try {
            $topic_name = $param['topic_name'];
            $vmessageList = $param['message_list'];
            $tags = $param['tags'] ?? null;
            $keys = $param['keys'] ?? '';
            $topic = $this->account->get_topic($topic_name);
            return $topic->batch_publish_message($vmessageList, $tags, $keys);
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }

    /**
     * 消费主题消息
     * @author 贺强
     * @time   2022/6/13 16:42
     * @param array $param 参数
     */
    public function receiveTopicMessage($param)
    {
        if (empty($param['queue_name'])) {
            return false;
        }
        try {
            $polling_wait_seconds = $param['polling_wait_seconds'] ?? 3;
            $queue_meta = new \QueueMeta();
            $queue_meta->queueName = $param['queue_name'];
            $queue_meta->pollingWaitSeconds = $polling_wait_seconds;
            $queue_meta->visibilityTimeout = $param['visibility_timeout'] ?? 60;
            $queue_meta->maxMsgSize = $param['max_msg_size'] ?? 4096;
            $queue_meta->msgRetentionSeconds = $param['msg_retention_seconds'] ?? 3600;
            $queue = $this->account->get_queue($param['queue_name']);
            $msg = $queue->receive_message($polling_wait_seconds);
            return $msg;
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }

    /**
     * 批量消费主题消息
     * @author 贺强
     * @time   2022/6/13 20:51
     * @param string $queue_name 队列名称
     * @param int    $num_of_msg 一次消费多少条
     */
    public function batchReceiveTopicMessage($queue_name, $num_of_msg)
    {
        if (empty($queue_name) || empty($num_of_msg)) {
            return false;
        }
        try {
            $queue = $this->account->get_queue($queue_name);
            $num_of_msg = $num_of_msg < 16 ? $num_of_msg : 16;
            $res = $queue->batch_receive_message($num_of_msg);
            return $res;
        } catch (CMQExceptionBase $e) {
            return $e->getMessage();
        }
    }
}
